python多线程

  • 线程,轻量级进程,最小调度单位。进程是资源分配的最小单位。
  • 一个标准线程由【线程ID,当前指令指针PC,寄存器集合,堆栈】组成
  • 线程可与同属于一个进程的其他线程共享进程所拥有的全部资源
  • 一个线程可以创建和撤销另一个线程

多线程的意义

  • 同一进程中的多个线程之间可以并发执行,即多线程。

  • 单核环境中,通过分离IO密集线程和CPU密集线程,可以用来消除IO阻塞,减少CPU闲置的损失。

  • 多核环境中,如果操作系统和编程语言都支持并行,那么多线程可以实现并行计算,充分利用机器多核的优势。

    不幸的是通用的python解释器CPython在开发之初,多核CPU还是个科幻概念,最简单经济的做法就是为CPython做一个全局线程锁,于是就有了GIL。GIL的存在使得CPython无法通过多线程来实现多核并行。

  • 对于CPU密集型运算:单核环境下,多线程切换线程反而降低了程序的效率。单核单线程,双核双线程,四核四线程是最适合的,但这也只是针对其他语言的。

    在CPython中,同样由于GIL,一个进程下的一组线程同一时间只能运行一个线程。

    CPython下,CPU密集型运算每个进程单线程就OK。

  • IO密集型程序中多线程更快。原因是这种程序的单个线程运算量不足以占满CPU一个内核。

多线程的实现

  1. thread

    python底层实现

    python 3中已废弃,用_thread保持兼容性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    from time import ctime,sleep
    from _thread import start_new_thread

    def loop1():
    print( "enter loop1:",ctime())
    sleep(3);
    print( "leave loop1:",ctime())

    def loop2():
    print( "enter loop2:",ctime())
    sleep(5);
    print( "leave loop2:",ctime())

    def main():
    print( "main begin:",ctime())
    start_new_thread(loop1,())
    start_new_thread(loop2,())
    sleep(8)
    print( "main end:",ctime())

    if __name__=="__main__":
    main()

    ==========================================================
    main begin: Fri Nov 2 15:47:06 2018
    enter loop1: Fri Nov 2 15:47:06 2018
    enter loop2: Fri Nov 2 15:47:06 2018
    leave loop1: Fri Nov 2 15:47:09 2018
    leave loop2: Fri Nov 2 15:47:11 2018
    main end: Fri Nov 2 15:47:14 2018
  2. threading 对一些线程的操作对象化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    """
    使用python定义的的线程对象
    """
    # 面向过程写法
    import threading,time
    from time import sleep, ctime

    def now() :
    return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )

    def test(nloop, nsec):
    print( 'start loop', nloop, 'at:', now())
    sleep(nsec)
    print('loop', nloop, 'done at:', now())

    def main():
    print('starting at:',now())
    threadpool=[]

    for i in range(10):
    # 创建线程对象
    th = threading.Thread(target= test,args= (i,2))
    threadpool.append(th)

    for th in threadpool:
    th.start()

    for th in threadpool :
    threading.Thread.join( th )

    print('all Done at:', now())

    if __name__ == '__main__':
    main()

    ====================================================
    starting at: 2018-11-02 15:46:22
    start loop 0 at: 2018-11-02 15:46:22
    start loop 1 at: 2018-11-02 15:46:22
    start loop 2 at: 2018-11-02 15:46:22
    start loop 3 at: 2018-11-02 15:46:22
    start loop 4 at: 2018-11-02 15:46:22
    start loop 5 at: 2018-11-02 15:46:22
    start loop 6 at: 2018-11-02 15:46:22
    start loop 7 at: 2018-11-02 15:46:22
    start loop 8 at: 2018-11-02 15:46:22
    start loop 9 at: 2018-11-02 15:46:22
    loop 0 done at: 2018-11-02 15:46:24
    loop 1 done at: 2018-11-02 15:46:24
    loop 3 done at: 2018-11-02 15:46:24
    loop 4 done at: 2018-11-02 15:46:24
    loop 5 done at: 2018-11-02 15:46:24
    loop 2 done at: 2018-11-02 15:46:24
    loop 8 done at: 2018-11-02 15:46:24
    loop 9 done at: 2018-11-02 15:46:24
    loop 6 done at: 2018-11-02 15:46:24
    loop 7 done at: 2018-11-02 15:46:24
    all Done at: 2018-11-02 15:46:24
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    """
    自定义线程对象
    """
    import threading ,time
    from time import sleep, ctime

    def now() :
    return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )

    class myThread (threading.Thread) :
    """docstring for myThread"""
    def __init__(self, nloop, nsec) :
    super(myThread, self).__init__()
    self.nloop = nloop
    self.nsec = nsec

    def run(self):
    print('start loop', self.nloop, 'at:', ctime())
    sleep(self.nsec)
    print( 'loop', self.nloop, 'done at:', ctime())
    def main():
    thpool=[]
    print('starting at:',now())

    for i in range(10):
    thpool.append(myThread(i,2))

    for th in thpool:
    th.start()

    for th in thpool:
    th.join()

    print('all Done at:', now())

    if __name__ == '__main__':
    main()

    ===========================================
    starting at: 2018-11-02 15:49:42
    start loop 0 at: Fri Nov 2 15:49:42 2018
    start loop 1 at: Fri Nov 2 15:49:42 2018
    start loop 2 at: Fri Nov 2 15:49:42 2018
    start loop 3 at: Fri Nov 2 15:49:42 2018
    start loop 4 at: Fri Nov 2 15:49:42 2018
    start loop 5 at: Fri Nov 2 15:49:42 2018
    start loop 6 at: Fri Nov 2 15:49:42 2018
    start loop 7 at: Fri Nov 2 15:49:42 2018
    start loop 8 at: Fri Nov 2 15:49:42 2018
    start loop 9 at: Fri Nov 2 15:49:42 2018
    loop 0 done at: Fri Nov 2 15:49:44 2018
    loop 1 done at: Fri Nov 2 15:49:44 2018
    loop 2 done at: Fri Nov 2 15:49:44 2018
    loop 4 done at: Fri Nov 2 15:49:44 2018
    loop 3 done at: Fri Nov 2 15:49:44 2018
    loop 6 done at: Fri Nov 2 15:49:44 2018
    loop 5 done at: Fri Nov 2 15:49:44 2018
    loop 7 done at: Fri Nov 2 15:49:44 2018
    loop 9 done at: Fri Nov 2 15:49:44 2018
    loop 8 done at: Fri Nov 2 15:49:44 2018
    all Done at: 2018-11-02 15:49:44
  3. threadpool 线程池

    第三方库,需要安装。

    pip install threadpool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import threadpool
    import time
    import urllib.request

    urls = [
    'http://www.baidu.com',
    'http://www.360.com',
    'http://www.2345.com',
    'http://www.jd.com',
    'http://www.taobao.com'
    ]

    def myRequest(url):
    resp = urllib.request.urlopen(url)
    print( url, resp.getcode())

    def timeCost(request, n):

    global start
    print("Elapsed time: %s" % (time.time()-start))
    

    start = time.time()
    pool = threadpool.ThreadPool(5)
    reqs = threadpool.makeRequests(myRequest, urls, timeCost)
    [ pool.putRequest(req) for req in reqs ]
    pool.wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37



## threading 线程同步

###threading.Lock对象
互斥锁(mutex )
有acquire()和release()方法。

​```python
from threading import Lock, Thread
lock = Lock()
some_var = 0
class IncrementThread(Thread):
def run(self):
#we want to read a global variable
#and then increment it
global some_var
lock.acquire()
read_value = some_var
print( "some_var in %s is %d" % (self.name, read_value))
some_var = read_value + 1
print ("some_var in %s after increment is %d" % (self.name, some_var))
lock.release()

def use_increment_thread():
threads = []
for i in range(50):
t = IncrementThread()
threads.append(t)
t.start()
for t in threads:
t.join()
print( "After 50 modifications, some_var should have become 50")
print( "After 50 modifications, some_var is %d" % (some_var,))

use_increment_thread()

threading.Semaphore

semaphore,也就是计数锁。
创建对象的时候,可以传递一个整数作为计数上限 (sema = threading.Semaphore(5))。
与Lock类似,也有acquire和release方法。

线程通信

condition

threading.Condition对象: condition variable,建立该对象时,会包含一个Lock对象 (因为condition variable总是和mutex一起使用)。可以对Condition对象调用acquire()和release()方法,以控制潜在的Lock对象。此外:

  • wait()方法,相当于cond_wait()
  • notify_all(),相当与cond_broadcast()
  • nofify(),与notify_all()功能类似,但只唤醒一个等待的线程,而不是全部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 生产者-消费者模型
import threading
import time

# 商品
product = None
# 条件变量
con = threading.Condition()

# 生产者方法
def produce():
global product
if con.acquire():
while True:
if product is None:
print( 'produce...')
product = 'anything'

# 通知消费者,商品已经生产
con.notify()

# 等待通知
con.wait()
time.sleep(2)

# 消费者方法
def consume():
global product
if con.acquire():
while True:
if product is not None:
print( 'consume...')
product = None

# 通知生产者,商品已经没了
con.notify()
# 等待通知
con.wait()
time.sleep(2)

t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()

threading.Event

与threading.Condition相类似,相当于没有潜在的Lock保护的condition variable。
对象有True和False两个状态。
可以多个线程使用wait()等待,直到某个线程调用该对象的set()方法,将对象设置为True。
线程可以调用对象的clear()方法来重置对象为False状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

event = threading.Event()

def func():
# 等待事件,进入等待阻塞状态
print( '%s wait for event...' % threading.currentThread().getName())
event.wait()
# 收到事件后进入运行状态
print( '%s recv event.' % threading.currentThread().getName())

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()

time.sleep(2)

# 发送事件通知
print( 'MainThread set event.')
event.set()